<!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
 this work for additional information regarding copyright ownership.
 The ASF licenses this file to You under the Apache License, Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-->

<script><!--#include virtual="../../js/templateData.js" --></script>

<script id="content-template" type="text/x-handlebars-template">
    <!-- h1>Developer Guide for Kafka Streams</h1 -->
    <div class="sub-nav-sticky">
        <div class="sticky-top">
            <!-- div style="height:35px">
              <a href="/{{version}}/documentation/streams/">Introduction</a>
              <a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
              <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
              <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
              <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
            </div -->
        </div>
    </div>

    <div class="section" id="testing">
        <span id="streams-developer-guide-testing"></span>
        <h1>Testing Kafka Streams<a class="headerlink" href="#testing" title="Permalink to this headline"></a></h1>
        <div class="contents local topic" id="table-of-contents">
            <p class="topic-title first"><b>Table of Contents</b></p>
            <ul class="simple">
                <li><a class="reference internal" href="#test-utils-artifact">Importing the test utilities</a></li>
                <li><a class="reference internal" href="#testing-topologytestdriver">Testing Streams applications</a>
                </li>
                <li><a class="reference internal" href="#unit-testing-processors">Unit testing Processors</a>
                </li>
            </ul>
        </div>
        <div class="section" id="test-utils-artifact">
            <h2><a class="toc-backref" href="#test-utils-artifact" title="Permalink to this headline">Importing the test
                utilities</a></h2>
            <p>
                To test a Kafka Streams application, Kafka provides a test-utils artifact that can be added as regular
                dependency to your test code base. Example <code>pom.xml</code> snippet when using Maven:
            </p>
            <pre>
&lt;dependency&gt;
    &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
    &lt;artifactId&gt;kafka-streams-test-utils&lt;/artifactId&gt;
    &lt;version&gt;{{fullDotVersion}}&lt;/version&gt;
    &lt;scope&gt;test&lt;/scope&gt;
&lt;/dependency&gt;
    </pre>
        </div>
        <div class="section" id="testing-topologytestdriver">
            <h2><a class="toc-backref" href="#testing-topologytestdriver" title="Permalink to this headline">Testing a
                Streams application</a></h2>

            <p>
                The test-utils package provides a <code>TopologyTestDriver</code> that can be used pipe data through a
                <code>Topology</code> that is either assembled manually
                using Processor API or via the DSL using <code>StreamsBuilder</code>.
                The test driver simulates the library runtime that continuously fetches records from input topics and
                processes them by traversing the topology.
                You can use the test driver to verify that your specified processor topology computes the correct result
                with the manually piped in data records.
                The test driver captures the results records and allows to query its embedded state stores.
            <pre>
// Processor API
Topology topology = new Topology();
topology.addSource("sourceProcessor", "input-topic");
topology.addProcessor("processor", ..., "sourceProcessor");
topology.addSink("sinkProcessor", "output-topic", "processor");
// or
// using DSL
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic").filter(...).to("output-topic");
Topology topology = builder.build();

// setup test driver
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
        </pre>
            <p>
                With the test driver you can create <code>TestInputTopic</code> giving topic name and the corresponding serializers.
                <code>TestInputTopic</code> provides various methods to pipe new message values, keys and values, or list of KeyValue objects.
            </p>
            <pre>
TestInputTopic&lt;String, Long&gt; inputTopic = testDriver.createInputTopic("input-topic", stringSerde.serializer(), longSerde.serializer());
inputTopic.pipeInput("key", 42L);
        </pre>
            <p>
                To verify the output, you can use <code>TestOutputTopic</code>
                where you configure the topic and the corresponding deserializers during initialization.
                It offers helper methods to read only certain parts of the result records or the collection of records.
                For example, you can validate returned <code>KeyValue</code> with standard assertions
                if you only care about the key and value, but not the timestamp of the result record.
            </p>
            <pre>
TestOutputTopic&lt;String, Long&gt; outputTopic = testDriver.createOutputTopic("result-topic", stringSerde.deserializer(), longSerde.deserializer());
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue&lt;&gt;("key", 42L)));
        </pre>
            <p>
                <code>TopologyTestDriver</code> supports punctuations, too.
                Event-time punctuations are triggered automatically based on the processed records' timestamps.
                Wall-clock-time punctuations can also be triggered by advancing the test driver's wall-clock-time (the
                driver mocks wall-clock-time internally to give users control over it).
            </p>
            <pre>
testDriver.advanceWallClockTime(Duration.ofSeconds(20));
        </pre>
            <p>
                Additionally, you can access state stores via the test driver before or after a test.
                Accessing stores before a test is useful to pre-populate a store with some initial values.
                After data was processed, expected updates to the store can be verified.
            </p>
            <pre>
KeyValueStore store = testDriver.getKeyValueStore("store-name");
        </pre>
            <p>
                Note, that you should always close the test driver at the end to make sure all resources are release
                properly.
            </p>
            <pre>
testDriver.close();
        </pre>

            <h3>Example</h3>
            <p>
                The following example demonstrates how to use the test driver and helper classes.
                The example creates a topology that computes the maximum value per key using a key-value-store.
                While processing, no output is generated, but only the store is updated.
                Output is only sent downstream based on event-time and wall-clock punctuations.
            </p>
            <pre>
private TopologyTestDriver testDriver;
private TestInputTopic&lt;String, Long&gt; inputTopic;
private TestOutputTopic&lt;String, Long&gt; outputTopic;
private KeyValueStore&lt;String, Long&gt; store;

private Serde&lt;String&gt; stringSerde = new Serdes.StringSerde();
private Serde&lt;Long&gt; longSerde = new Serdes.LongSerde();

@Before
public void setup() {
    Topology topology = new Topology();
    topology.addSource("sourceProcessor", "input-topic");
    topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor");
    topology.addStateStore(
        Stores.keyValueStoreBuilder(
            Stores.inMemoryKeyValueStore("aggStore"),
            Serdes.String(),
            Serdes.Long()).withLoggingDisabled(), // need to disable logging to allow store pre-populating
        "aggregator");
    topology.addSink("sinkProcessor", "result-topic", "aggregator");

    // setup test driver
    Properties props = new Properties();
    props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "maxAggregation");
    props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
    testDriver = new TopologyTestDriver(topology, props);

    // setup test topics
    inputTopic = testDriver.createInputTopic("input-topic", stringSerde.serializer(), longSerde.serializer());
    outputTopic = testDriver.createOutputTopic("result-topic", stringSerde.deserializer(), longSerde.deserializer());

    // pre-populate store
    store = testDriver.getKeyValueStore("aggStore");
    store.put("a", 21L);
}

@After
public void tearDown() {
    testDriver.close();
}

@Test
public void shouldFlushStoreForFirstInput() {
    inputTopic.pipeInput("a", 1L);
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue&lt;&gt;("a", 21L)));
    assertThat(outputTopic.isEmpty(), is(true));
}

@Test
public void shouldNotUpdateStoreForSmallerValue() {
    inputTopic.pipeInput("a", 1L);
    assertThat(store.get("a"), equalTo(21L));
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue&lt;&gt;("a", 21L)));
    assertThat(outputTopic.isEmpty(), is(true));
}

@Test
public void shouldNotUpdateStoreForLargerValue() {
    inputTopic.pipeInput("a", 42L);
    assertThat(store.get("a"), equalTo(42L));
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue&lt;&gt;("a", 42L)));
    assertThat(outputTopic.isEmpty(), is(true));
}

@Test
public void shouldUpdateStoreForNewKey() {
    inputTopic.pipeInput("b", 21L);
    assertThat(store.get("b"), equalTo(21L));
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue&lt;&gt;("a", 21L)));
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue&lt;&gt;("b", 21L)));
    assertThat(outputTopic.isEmpty(), is(true));
}

@Test
public void shouldPunctuateIfEvenTimeAdvances() {
    final Instant recordTime = Instant.now();
    inputTopic.pipeInput("a", 1L,  recordTime);
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue&lt;&gt;("a", 21L)));

    inputTopic.pipeInput("a", 1L,  recordTime);
    assertThat(outputTopic.isEmpty(), is(true));

    inputTopic.pipeInput("a", 1L, recordTime.plusSeconds(10L));
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue&lt;&gt;("a", 21L)));
    assertThat(outputTopic.isEmpty(), is(true));
}

@Test
public void shouldPunctuateIfWallClockTimeAdvances() {
    testDriver.advanceWallClockTime(Duration.ofSeconds(60));
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue&lt;&gt;("a", 21L)));
    assertThat(outputTopic.isEmpty(), is(true));
}

public class CustomMaxAggregatorSupplier implements ProcessorSupplier&lt;String, Long&gt; {
    @Override
    public Processor&lt;String, Long&gt; get() {
        return new CustomMaxAggregator();
    }
}

public class CustomMaxAggregator implements Processor&lt;String, Long&gt; {
    ProcessorContext context;
    private KeyValueStore&lt;String, Long&gt; store;

    @SuppressWarnings("unchecked")
    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -&gt; flushStore());
        context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -&gt; flushStore());
        store = (KeyValueStore&lt;String, Long&gt;) context.getStateStore("aggStore");
    }

    @Override
    public void process(String key, Long value) {
        Long oldValue = store.get(key);
        if (oldValue == null || value &gt; oldValue) {
            store.put(key, value);
        }
    }

    private void flushStore() {
        KeyValueIterator&lt;String, Long&gt; it = store.all();
        while (it.hasNext()) {
            KeyValue&lt;String, Long&gt; next = it.next();
            context.forward(next.key, next.value);
        }
    }

    @Override
    public void close() {}
}
        </pre>
        </div>
        <div class="section" id="unit-testing-processors">
            <h2>
                <a class="headerlink" href="#unit-testing-processors"
                   title="Permalink to this headline">Unit Testing Processors</a>
            </h2>
            <p>
                If you <a href="processor-api.html">write a Processor</a>, you will want to test it.
            </p>
            <p>
                Because the <code>Processor</code> forwards its results to the context rather than returning them,
                Unit testing requires a mocked context capable of capturing forwarded data for inspection.
                For this reason, we provide a <code>MockProcessorContext</code> in <a href="#test-utils-artifact"><code>test-utils</code></a>.
            </p>
            <b>Construction</b>
            <p>
                To begin with, instantiate your processor and initialize it with the mock context:
            <pre>
final Processor processorUnderTest = ...;
final MockProcessorContext context = new MockProcessorContext();
processorUnderTest.init(context);
                </pre>
            If you need to pass configuration to your processor or set the default serdes, you can create the mock with
            config:
            <pre>
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "unit-test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
props.put("some.other.config", "some config value");
final MockProcessorContext context = new MockProcessorContext(props);
                </pre>
            </p>
            <b>Captured data</b>
            <p>
                The mock will capture any values that your processor forwards. You can make assertions on them:
            <pre>
processorUnderTest.process("key", "value");

final Iterator&lt;CapturedForward&gt; forwarded = context.forwarded().iterator();
assertEquals(forwarded.next().keyValue(), new KeyValue&lt;&gt;(..., ...));
assertFalse(forwarded.hasNext());

// you can reset forwards to clear the captured data. This may be helpful in constructing longer scenarios.
context.resetForwards();

assertEquals(context.forwarded().size(), 0);
            </pre>
            If your processor forwards to specific child processors, you can query the context for captured data by
            child name:
            <pre>
final List&lt;CapturedForward&gt; captures = context.forwarded("childProcessorName");
            </pre>
            The mock also captures whether your processor has called <code>commit()</code> on the context:
            <pre>
assertTrue(context.committed());

// commit captures can also be reset.
context.resetCommit();

assertFalse(context.committed());
            </pre>
            </p>
            <b>Setting record metadata</b>
            <p>
                In case your processor logic depends on the record metadata (topic, partition, offset, or timestamp),
                you can set them on the context, either all together or individually:
            <pre>
context.setRecordMetadata("topicName", /*partition*/ 0, /*offset*/ 0L, /*timestamp*/ 0L);
context.setTopic("topicName");
context.setPartition(0);
context.setOffset(0L);
context.setTimestamp(0L);
                </pre>
            Once these are set, the context will continue returning the same values, until you set new ones.
            </p>
            <b>State stores</b>
            <p>
                In case your punctuator is stateful, the mock context allows you to register state stores.
                You're encouraged to use a simple in-memory store of the appropriate type (KeyValue, Windowed, or
                Session), since the mock context does <i>not</i> manage changelogs, state directories, etc.
            </p>
            <pre>
final KeyValueStore&lt;String, Integer&gt; store =
    Stores.keyValueStoreBuilder(
            Stores.inMemoryKeyValueStore("myStore"),
            Serdes.String(),
            Serdes.Integer()
        )
        .withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
        .build();
store.init(context, store);
context.register(store, /*deprecated parameter*/ false, /*parameter unused in mock*/ null);
            </pre>
            <b>Verifying punctuators</b>
            <p>
                Processors can schedule punctuators to handle periodic tasks.
                The mock context does <i>not</i> automatically execute punctuators, but it does capture them to
                allow you to unit test them as well:
            <pre>
final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
final long interval = capturedPunctuator.getIntervalMs();
final PunctuationType type = capturedPunctuator.getType();
final boolean cancelled = capturedPunctuator.cancelled();
final Punctuator punctuator = capturedPunctuator.getPunctuator();
punctuator.punctuate(/*timestamp*/ 0L);
                </pre>
            If you need to write tests involving automatic firing of scheduled punctuators, we recommend creating a
            simple topology with your processor and using the <a href="testing.html#testing-topologytestdriver"><code>TopologyTestDriver</code></a>.
            </p>
        </div>
    </div>
    <div class="pagination">
        <a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn pagination__btn__prev">Previous</a>
        <a href="/{{version}}/documentation/streams/developer-guide/interactive-queries" class="pagination__btn pagination__btn__next">Next</a>
    </div>
</script>

<!--#include virtual="../../../includes/_header.htm" -->
<!--#include virtual="../../../includes/_top.htm" -->
<div class="content documentation documentation--current">
    <!--#include virtual="../../../includes/_nav.htm" -->
    <div class="right">
        <!--#include virtual="../../../includes/_docs_banner.htm" -->
        <ul class="breadcrumbs">
            <li><a href="/documentation">Documentation</a></li>
            <li><a href="/documentation/streams">Kafka Streams</a></li>
            <li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
        </ul>
        <div class="p-content"></div>
    </div>
</div>
<!--#include virtual="../../../includes/_footer.htm" -->
<script>
    $(function () {
        // Show selected style on nav item
        $('.b-nav__streams').addClass('selected');

        //sticky secondary nav
        var $navbar = $(".sub-nav-sticky"),
            y_pos = $navbar.offset().top,
            height = $navbar.height();

        $(window).scroll(function () {
            var scrollTop = $(window).scrollTop();

            if (scrollTop > y_pos - height) {
                $navbar.addClass("navbar-fixed")
            } else if (scrollTop <= y_pos) {
                $navbar.removeClass("navbar-fixed")
            }
        });

        // Display docs subnav items
        $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
    });
</script>
